import wquantiles
import matplotlib
import sklearn.metrics
import sklearn.linear_model
import sklearn.preprocessing
import scipy as sp
import numpy as np
import pandas as pd
import pmdarima as pm
import matplotlib.pyplot as plt
import bootstrapped.bootstrap as bs
%matplotlib inline
# let's be fancy!
from IPython.core.display import HTML
HTML('<link href="https://stackpath.bootstrapcdn.com/bootstrap/4.3.1/css/bootstrap.min.css" rel="stylesheet" integrity="sha384-ggOyR0iXCbMQv3Xipma34MD+dH/1fQ784/j6cY/iJTQUOhcWr7x9JvoRxT2MZw1T" crossorigin="anonymous">')
X_train = pd.read_csv(r"data/training_dataset.csv", sep=";")
Y_train = pd.read_csv(r"data/training_solution.csv", sep=";", names=X_train.columns)
X_test = pd.read_csv(r"data/test_dataset.csv", sep=";")
assert np.array_equal(X_train.columns, X_test.columns)
COLS = X_train.columns.copy()
X_train.describe() # train set
Y_train.sum(axis=0) # number of failure labelings per feature
X_test.describe() # test set
# compare train and test histograms
for col in COLS:
plt.figure(figsize=(25, 5))
# (1) histogram
plt.subplot(1, 2, 1, label="histogram")
bins = np.histogram_bin_edges(np.concatenate([X_train[col], X_test[col]]), bins=50)
plt.hist(X_train[col], alpha=0.5, bins=bins, label="train")
plt.hist(X_test[col], alpha=0.5, bins=bins, label="test")
plt.legend()
plt.title("{} histogram".format(col))
# (2) raw values
plt.subplot(1, 2, 2, label="raw values")
plt.plot(X_train[col], label="train")
plt.plot(X_test[col], label="test")
errorband = np.where(Y_train[col])[0]
if len(errorband) > 0:
plt.fill_between(errorband, np.zeros_like(errorband), X_train[col][errorband],
facecolor='yellow', alpha=0.5, label="failures")
plt.legend()
plt.title("{} raw values".format(col))
plt.show()
def visualize_features(x, y=None, name=""):
plt.figure(figsize=(20, 1.1 * x.shape[1]))
for i, col in enumerate(x.columns):
y_ = sklearn.preprocessing.minmax_scale(x[col]) + i + 0.5
plt.plot(y_, label=col)
if y is not None:
errorband = np.where(y[col])[0]
if len(errorband) > 0:
plt.fill_between(errorband, np.full_like(errorband, i + 0.5, dtype=np.float),
y_[errorband], facecolor='yellow', alpha=0.75)
plt.gca().set_yticks(np.arange(1, len(x.columns) + 1))
plt.gca().set_xticks(np.arange(len(x), step=20))
plt.gca().set_axisbelow(True)
plt.grid(axis="x")
plt.legend()
plt.title("feature correlation {}".format(name))
plt.show()
visualize_features(X_train, Y_train, name="train")
visualize_features(X_test, name="test")
def visualize_correlation_coefficients(x, name="", method="spearman"):
plt.figure(figsize=(10, 10))
im = plt.imshow(np.abs(x.corr(method)), cmap="viridis")
ticks = np.arange(0, len(x.columns))
labels = ticks + 1
plt.gca().set_xticks(ticks)
plt.gca().set_yticks(ticks)
plt.gca().set_xticklabels(labels)
plt.gca().set_yticklabels(labels)
plt.title("absolute feature correlation {}".format(name))
plt.colorbar(im)
plt.show()
visualize_correlation_coefficients(X_train, "train")
visualize_correlation_coefficients(X_test, "test")
def fix_features_heuristically(x, y):
# fix constant offset in x2 by subtracting the mean
y2_errorband = np.where(y["x2"])[0]
y2_faulty = x["x2"][y2_errorband].values
x2_faulty = np.arange(len(y2_faulty)).reshape(-1, 1)
y2_offset = np.mean(y2_faulty) - np.median(x["x2"])
y2_fixed = y2_faulty - y2_offset
# fix constant trend in x3 by subtracting the linear component
y3_errorband = np.where(y["x3"])[0]
y3_faulty = x["x3"][y3_errorband].values
x3_faulty = np.arange(len(y3_faulty)).reshape(-1, 1)
linreg = sklearn.linear_model.LinearRegression()
model = linreg.fit(x3_faulty, y3_faulty)
y3_fixed = y3_faulty - (x3_faulty * model.coef_).ravel()
# build the new data frame
result = x.copy()
result.loc[y2_errorband, "x2"] = y2_fixed
result.loc[y3_errorband, "x3"] = y3_fixed
return result
X_train_fixed = fix_features_heuristically(X_train, Y_train)
visualize_features(pd.DataFrame(X_train.loc[:, ["x2", "x3"]]), Y_train, name="train (original x2 + x3)")
visualize_features(pd.DataFrame(X_train_fixed.loc[:, ["x2", "x3"]]), Y_train, name="train (fixed x2 + x3)")
def correlation_confidence_interval(a, b, alpha=0.05, method="spearman"):
# (c) http://onlinestatbook.com/2/estimation/correlation_ci.html
assert method == "pearson" or method == "spearman"
n = len(a)
r, p = sp.stats.pearsonr(a, b) if method == "pearson" else sp.stats.spearmanr(a, b)
z_ = np.arctanh(r) # transform r to fisher z score
stderr = 1.0 / math.sqrt(n - 3) # standard error or normally-distributed sampling distribution
z_crit = sp.stats.norm.ppf(1 - alpha / 2) # 2-tailed z critical value
r_low = np.tanh(z_ - z_crit * stderr)
r_high = np.tanh(z_ + z_crit * stderr)
return r_low, r_high
def visualize_prediction(x_true, x_pred, x_pred_offset=0, name="",
error_inner=None, error_outer=None, error_lower=None, error_upper=None,
y_mark=None, stretch=False):
plt.figure(figsize=(20 if not stretch else 60, 4))
x_true_range = np.arange(len(x_true))
x_pred_range = np.arange(x_pred_offset, len(x_pred) + x_pred_offset)
plt.plot(x_true_range, x_true, color="black", alpha=0.5, ls="--", label="true")
plt.plot(x_pred_range, x_pred, color="orange", label="prediction")
assert (error_inner is not None or error_outer is not None) ^ (error_lower is not None or error_upper is not None)
if error_inner is not None:
plt.fill_between(x_pred_range, x_pred - error_inner, x_pred + error_inner, color="orange", alpha=0.2)
if error_outer is not None:
plt.fill_between(x_pred_range, x_pred - error_outer, x_pred + error_outer, color="orange", alpha=0.2)
if error_lower is not None or error_upper is not None:
error_lower = error_lower if error_lower is not None else np.zeros_like(x_pred)
error_upper = error_upper if error_upper is not None else np.zeros_like(x_pred)
plt.fill_between(x_pred_range, x_pred - error_lower, x_pred + error_upper, color="orange", alpha=0.2)
if y_mark is not None:
y_low, y_high = plt.ylim()
y_chunk = (y_high - y_low) / 50
plt.fill_between(x_pred_range, y_low, y_high, where=y_mark, facecolor='yellow', alpha=0.25, zorder=-1)
plt.fill_between(x_pred_range, y_high - y_chunk, y_high, where=y_mark, facecolor='red', alpha=0.5, zorder=-1)
plt.fill_between(x_pred_range, y_low, y_low + y_chunk, where=y_mark, facecolor='red', alpha=0.5, zorder=-1)
if stretch:
minor = matplotlib.ticker.MultipleLocator(base=1)
plt.gca().xaxis.set_minor_locator(minor)
major = matplotlib.ticker.MultipleLocator(base=5)
plt.gca().xaxis.set_major_locator(major)
plt.grid(which="minor", axis="x", linestyle='--')
plt.gca().set_axisbelow(True)
plt.title("feature predictions {}".format(name))
plt.xlim(x_pred_range[0], x_pred_range[-1])
plt.legend()
plt.show()
# visualize_prediction(X_test["x1"], pred, error_lower=pred - err[:, 0], error_upper=err[:, 1] - pred, y_mark=mark, stretch=True)
def ridge_regression(train, column, test=None, test_names=None, ridge_alpha=100, ci_alpha=0.01):
test = test if test is not None else train
test_names = test_names if test_names is not None else ""
if not isinstance(test, (list,)):
test = [test]
if not isinstance(test_names, (list,)):
test_names = [test_names]
X, y = train.loc[:, COLS != column], train[column]
Z = np.column_stack([X, y])
ridge = sklearn.linear_model.Ridge(alpha=ridge_alpha)
scaler = sklearn.preprocessing.MinMaxScaler().fit(X, y)
model = ridge.fit(scaler.transform(X), y)
def _ridge_predict_error(values):
X_, y_true = values[:, :-1], values[:, -1]
y_pred = model.predict(scaler.transform(X_))
return np.percentile(np.abs(y_true - y_pred), q=95)
def _ridge_predict_error_bootstrap(samples):
if samples.ndim == 2: # single observation
return [_ridge_predict_error(samples)]
return [_ridge_predict_error(samples[i, :, :]) for i in range(samples.shape[0])]
# do bootstrap to get confidence interval
ci = bs.bootstrap(Z, stat_func=_ridge_predict_error_bootstrap, alpha=ci_alpha)
# prediction
for t, n in zip(test, test_names):
X_, y_ = t.loc[:, COLS != column], t[column]
prediction = model.predict(scaler.transform(X_))
visualize_prediction(y_, prediction, error_outer=ci.upper_bound, name="{} {}".format(n, column))
for col in COLS:
ridge_regression(X_train_fixed, col, test=[X_train, X_test], test_names=["train", "test"])
def identify_outliers(x_errorband, x_true):
return np.where(np.logical_and(x_errorband[:, 0] <= x_true, x_true <= x_errorband[:, 1]), 0, 1)
def smooth(x, window_size=3, threshold=0.5):
window = np.ones(window_size) / window_size
conv = np.convolve(x, window, 'same')
return np.where(conv >= threshold, 1, 0)
def autoarima(trainset, column, testset=None, debug=False, pred_trust=6, err_trust=1, err_shrink=0):
predictions, errorbands, correlations, weights_pred, weights_err = [], [], [], [], []
models = {}
# train n - 1 non-seasonal arima models, predict on test set, store result
for dependent in COLS:
if dependent == column:
continue
## TRAIN
##
X, y = trainset[dependent].values.reshape(-1, 1), trainset[column]
# store empirical correlation and compute effective weight
corr = sp.stats.spearmanr(X, y)[0]
weight_pred = np.power(np.abs(corr), pred_trust)
weight_err = np.power(np.abs(corr), err_trust)
correlations.append(corr)
weights_pred.append(weight_pred)
weights_err.append(weight_err)
# train arima model
model = pm.auto_arima(y=y, exogenous=X, seasonal=False,
error_action='ignore', suppress_warnings=True, stepwise=True)
models[dependent] = model
## PREDICT with one dependent feature
##
X_, y_ = testset[dependent].values.reshape(-1, 1), testset[column]
# [obsolete] append test data and predict in-sample
# offset = X.shape[0]
# model.update(y_, exogenous=X_)
# prediction, errorband = model.predict_in_sample(exogenous=X_, start=offset, return_conf_int=True, dynamic=True)
prediction, errorband = model.predict(exogenous=X_, n_periods=len(testset), return_conf_int=True)
predictions.append(prediction)
errorbands.append(errorband)
if debug:
visualize_prediction(testset[column], prediction, error_lower=prediction - errorband[:, 0], error_upper=errorband[:, 1] - prediction,
name="for {} with dependent={}, correlation={:.2f}, weight_pred={:.4f}, weight_err={:.4f}".format(column, dependent, corr, weight_pred, weight_err))
if debug:
print("correlations", np.abs(correlations))
print("weights_pred", weights_pred)
print("weights_err", weights_err)
# unweighted average
# pred = np.percentile(np.vstack(predictions), q=50, axis=0)
# err = np.percentile(np.stack(errorbands), q=50, axis=0)
# quantile-weighted median
pred = wquantiles.median(np.vstack(predictions).T, weights_pred)
err = wquantiles.median(np.stack(errorbands).transpose((1, 2, 0)), weights_err)
# error shrinkage
iqr = err[:, 1] - err[:, 0]
err[:, 0] = err[:, 0] + iqr * err_shrink
err[:, 1] = err[:, 1] - iqr * err_shrink
# mark outliers
mark = smooth(identify_outliers(err, testset[column]))
visualize_prediction(testset[column], pred, error_lower=pred - err[:, 0], error_upper=err[:, 1] - pred, name="{}".format(column), y_mark=mark, stretch=True)
# final (merged) predictions and errorbands
return pred, err, mark
preds, errs, marks = [], [], []
scaler = sklearn.preprocessing.StandardScaler().fit(X_train_fixed)
X_train_fixed_scaled = X_train_fixed.copy()
X_train_fixed_scaled[:] = scaler.transform(X_train_fixed)
X_test_scaled = X_test.copy()
X_test_scaled[:] = scaler.transform(X_test)
for col in COLS:
pred, err, mark = autoarima(X_train_fixed_scaled, col, testset=X_test_scaled, debug=False, pred_trust=3, err_trust=3, err_shrink=0.2)
preds.append(pred)
errs.append(err)
marks.append(mark)
final = pd.DataFrame(marks).T
print("Number of Faults {}".format(final.sum().sum()))
final.to_csv(r"data/test_solution.csv", header=False, index=False, sep=";")